'use strict';

import stream from 'stream';
import utils from '../utils.js';
import throttle from './throttle.js';
import speedometer from './speedometer.js';

const kInternals = Symbol('internals');

class AxiosTransformStream extends stream.Transform {
    constructor(options) {
        options = utils.toFlatObject(options, {
            maxRate: 0,
            chunkSize: 64 * 1024,
            minChunkSize: 100,
            timeWindow: 500,
            ticksRate: 2,
            samplesCount: 15
        }, null, (prop, source) => {
            return !utils.isUndefined(source[prop]);
        });

        super({
            readableHighWaterMark: options.chunkSize
        });

        const self = this;

        const internals = this[kInternals] = {
            length: options.length,
            timeWindow: options.timeWindow,
            ticksRate: options.ticksRate,
            chunkSize: options.chunkSize,
            maxRate: options.maxRate,
            minChunkSize: options.minChunkSize,
            bytesSeen: 0,
            isCaptured: false,
            notifiedBytesLoaded: 0,
            ts: Date.now(),
            bytes: 0,
            onReadCallback: null
        };

        const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow);

        this.on('newListener', event => {
            if (event === 'progress') {
                if (!internals.isCaptured) {
                    internals.isCaptured = true;
                }
            }
        });

        let bytesNotified = 0;

        internals.updateProgress = throttle(function throttledHandler() {
            const totalBytes = internals.length;
            const bytesTransferred = internals.bytesSeen;
            const progressBytes = bytesTransferred - bytesNotified;
            if (!progressBytes || self.destroyed) return;

            const rate = _speedometer(progressBytes);

            bytesNotified = bytesTransferred;

            process.nextTick(() => {
                self.emit('progress', {
                    loaded: bytesTransferred,
                    total: totalBytes,
                    progress: totalBytes ? (bytesTransferred / totalBytes) : undefined,
                    bytes: progressBytes,
                    rate: rate ? rate : undefined,
                    estimated: rate && totalBytes && bytesTransferred <= totalBytes ?
                        (totalBytes - bytesTransferred) / rate : undefined,
                    lengthComputable: totalBytes != null
                });
            });
        }, internals.ticksRate);

        const onFinish = () => {
            internals.updateProgress.call(true);
        };

        this.once('end', onFinish);
        this.once('error', onFinish);
    }

    _read(size) {
        const internals = this[kInternals];

        if (internals.onReadCallback) {
            internals.onReadCallback();
        }

        return super._read(size);
    }

    _transform(chunk, encoding, callback) {
        const self = this;
        const internals = this[kInternals];
        const maxRate = internals.maxRate;

        const readableHighWaterMark = this.readableHighWaterMark;

        const timeWindow = internals.timeWindow;

        const divider = 1000 / timeWindow;
        const bytesThreshold = (maxRate / divider);
        const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;

        function pushChunk(_chunk, _callback) {
            const bytes = Buffer.byteLength(_chunk);
            internals.bytesSeen += bytes;
            internals.bytes += bytes;

            if (internals.isCaptured) {
                internals.updateProgress();
            }

            if (self.push(_chunk)) {
                process.nextTick(_callback);
            } else {
                internals.onReadCallback = () => {
                    internals.onReadCallback = null;
                    process.nextTick(_callback);
                };
            }
        }

        const transformChunk = (_chunk, _callback) => {
            const chunkSize = Buffer.byteLength(_chunk);
            let chunkRemainder = null;
            let maxChunkSize = readableHighWaterMark;
            let bytesLeft;
            let passed = 0;

            if (maxRate) {
                const now = Date.now();

                if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
                    internals.ts = now;
                    bytesLeft = bytesThreshold - internals.bytes;
                    internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
                    passed = 0;
                }

                bytesLeft = bytesThreshold - internals.bytes;
            }

            if (maxRate) {
                if (bytesLeft <= 0) {
                    // next time window
                    return setTimeout(() => {
                        _callback(null, _chunk);
                    }, timeWindow - passed);
                }

                if (bytesLeft < maxChunkSize) {
                    maxChunkSize = bytesLeft;
                }
            }

            if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
                chunkRemainder = _chunk.subarray(maxChunkSize);
                _chunk = _chunk.subarray(0, maxChunkSize);
            }

            pushChunk(_chunk, chunkRemainder ? () => {
                process.nextTick(_callback, null, chunkRemainder);
            } : _callback);
        };

        transformChunk(chunk, function transformNextChunk(err, _chunk) {
            if (err) {
                return callback(err);
            }

            if (_chunk) {
                transformChunk(_chunk, transformNextChunk);
            } else {
                callback(null);
            }
        });
    }

    setLength(length) {
        this[kInternals].length = +length;
        return this;
    }
}

export default AxiosTransformStream;
